/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.io;



import static com.bff.gaia.unified.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE;

import static com.bff.gaia.unified.sdk.transforms.Contextful.fn;

import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkArgument;

import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkState;



import com.google.auto.value.AutoValue;

import java.io.IOException;

import java.io.InputStream;

import java.io.Serializable;

import java.nio.channels.Channels;

import java.nio.channels.ReadableByteChannel;

import java.nio.channels.SeekableByteChannel;

import java.nio.channels.WritableByteChannel;

import java.nio.charset.StandardCharsets;

import java.text.DecimalFormat;

import java.util.Collection;

import java.util.List;

import javax.annotation.Nullable;

import com.bff.gaia.unified.sdk.annotations.Experimental;

import com.bff.gaia.unified.sdk.coders.CannotProvideCoderException;

import com.bff.gaia.unified.sdk.coders.Coder;

import com.bff.gaia.unified.sdk.coders.StringUtf8Coder;

import com.bff.gaia.unified.sdk.coders.VoidCoder;

import com.bff.gaia.unified.sdk.io.fs.EmptyMatchTreatment;

import com.bff.gaia.unified.sdk.io.fs.MatchResult;

import com.bff.gaia.unified.sdk.io.fs.ResourceId;

import com.bff.gaia.unified.sdk.options.ValueProvider;

import com.bff.gaia.unified.sdk.options.ValueProvider.StaticValueProvider;

import com.bff.gaia.unified.sdk.transforms.Contextful;

import com.bff.gaia.unified.sdk.transforms.Contextful.Fn;

import com.bff.gaia.unified.sdk.transforms.Create;

import com.bff.gaia.unified.sdk.transforms.DoFn;

import com.bff.gaia.unified.sdk.transforms.GroupByKey;

import com.bff.gaia.unified.sdk.transforms.PTransform;

import com.bff.gaia.unified.sdk.transforms.ParDo;

import com.bff.gaia.unified.sdk.transforms.Requirements;

import com.bff.gaia.unified.sdk.transforms.Reshuffle;

import com.bff.gaia.unified.sdk.transforms.SerializableFunction;

import com.bff.gaia.unified.sdk.transforms.SerializableFunctions;

import com.bff.gaia.unified.sdk.transforms.Values;

import com.bff.gaia.unified.sdk.transforms.Watch;

import com.bff.gaia.unified.sdk.transforms.Watch.Growth.PollFn;

import com.bff.gaia.unified.sdk.transforms.Watch.Growth.TerminationCondition;

import com.bff.gaia.unified.sdk.transforms.display.DisplayData;

import com.bff.gaia.unified.sdk.transforms.display.HasDisplayData;

import com.bff.gaia.unified.sdk.transforms.windowing.BoundedWindow;

import com.bff.gaia.unified.sdk.transforms.windowing.GlobalWindow;

import com.bff.gaia.unified.sdk.transforms.windowing.IntervalWindow;

import com.bff.gaia.unified.sdk.transforms.windowing.PaneInfo;

import com.bff.gaia.unified.sdk.util.StreamUtils;

import com.bff.gaia.unified.sdk.values.PBegin;

import com.bff.gaia.unified.sdk.values.PCollection;

import com.bff.gaia.unified.sdk.values.PCollectionView;

import com.bff.gaia.unified.sdk.values.TypeDescriptor;

import com.bff.gaia.unified.sdk.values.TypeDescriptors;

import com.bff.gaia.unified.vendor.guava.com.google.common.annotations.VisibleForTesting;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.MoreObjects;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.Objects;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.Lists;

import org.joda.time.Duration;

import org.joda.time.Instant;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



/**

 * General-purpose transforms for working with files: listing files (matching), reading and writing.

 *

 * <h2>Matching filepatterns</h2>

 *

 * <p>{@link #match} and {@link #matchAll} match filepatterns (respectively either a single

 * filepattern or a {@link PCollection} thereof) and return the files that match them as {@link

 * PCollection PCollections} of {@link MatchResult.Metadata}. Configuration options for them are in

 * {@link MatchConfiguration} and include features such as treatment of filepatterns that don't

 * match anything and continuous incremental matching of filepatterns (watching for new files).

 *

 * <h3>Example: Watching a single filepattern for new files</h3>

 *

 * <p>This example matches a single filepattern repeatedly every 30 seconds, continuously returns

 * new matched files as an unbounded {@code PCollection<Metadata>} and stops if no new files appear

 * for 1 hour.

 *

 * <pre>{@code

 * PCollection<Metadata> matches = p.apply(FileIO.match()

 *     .filepattern("...")

 *     .continuously(

 *       Duration.standardSeconds(30), afterTimeSinceNewOutput(Duration.standardHours(1))));

 * }</pre>

 *

 * <h3>Example: Matching a PCollection of filepatterns arriving from Kafka</h3>

 *

 * <p>This example reads filepatterns from Kafka and matches each one as it arrives, producing again

 * an unbounded {@code PCollection<Metadata>}, and failing in case the filepattern doesn't match

 * anything.

 *

 * <pre>{@code

 * PCollection<String> filepatterns = p.apply(KafkaIO.read()...);

 *

 * PCollection<Metadata> matches = filepatterns.apply(FileIO.matchAll()

 *     .withEmptyMatchTreatment(DISALLOW));

 * }</pre>

 *

 * <h2>Reading files</h2>

 *

 * <p>{@link #readMatches} converts each result of {@link #match} or {@link #matchAll} to a {@link

 * ReadableFile} that is convenient for reading a file's contents, optionally decompressing it.

 *

 * <h3>Example: Returning filenames and contents of compressed files matching a filepattern</h3>

 *

 * <p>This example matches a single filepattern and returns {@code KVs} of filenames and their

 * contents as {@code String}, decompressing each file with GZIP.

 *

 * <pre>{@code

 * PCollection<KV<String, String>> filesAndContents = p

 *     .apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))

 *     // withCompression can be omitted - by default compression is detected from the filename.

 *     .apply(FileIO.readMatches().withCompression(GZIP))

 *     .apply(MapElements

 *         // uses imports from TypeDescriptors

 *         .into(KVs(strings(), strings()))

 *         .via((ReadableFile f) -> KV.of(

 *             f.getMetadata().resourceId().toString(), f.readFullyAsUTF8String())));

 * }</pre>

 *

 * <h2>Writing files</h2>

 *

 * <p>{@link #write} and {@link #writeDynamic} write elements from a {@link PCollection} of a given

 * type to files, using a given {@link Sink} to write a set of elements to each file. The collection

 * can be bounded or unbounded - in either case, writing happens by default per window and pane, and

 * the amount of data in each window and pane is finite, so a finite number of files ("shards") are

 * written for each window and pane. There are several aspects to this process:

 *

 * <ul>

 *   <li><b>How many shards are generated per pane:</b> This is controlled by <i>sharding</i>, using

 *       {@link Write#withNumShards} or {@link Write#withSharding}. The default is runner-specific,

 *       so the number of shards will vary based on runner behavior, though at least 1 shard will

 *       always be produced for every non-empty pane. Note that setting a fixed number of shards can

 *       hurt performance: it adds an additional {@link GroupByKey} to the pipeline. However, it is

 *       required to set it when writing an unbounded {@link PCollection} due to <a

 *       href="https://issues.apache.org/jira/browse/BEAM-1438">BEAM-1438</a> and similar behavior

 *       in other runners.

 *   <li><b>How the shards are named:</b> This is controlled by a {@link Write.FileNaming}:

 *       filenames can depend on a variety of inputs, e.g. the window, the pane, total number of

 *       shards, the current file's shard index, and compression. Controlling the file naming is

 *       described in the section <i>File naming</i> below.

 *   <li><b>Which elements go into which shard:</b> Elements within a pane get distributed into

 *       different shards created for that pane arbitrarily, though {@link FileIO.Write} attempts to

 *       make shards approximately evenly sized. For more control over which elements go into which

 *       files, consider using <i>dynamic destinations</i> (see below).

 *   <li><b>How a given set of elements is written to a shard:</b> This is controlled by the {@link

 *       Sink}, e.g. {@link AvroIO#sink} will generate Avro files. The {@link Sink} controls the

 *       format of a single file: how to open a file, how to write each element to it, and how to

 *       close the file - but it does not control the set of files or which elements go where.

 *       Elements are written to a shard in an arbitrary order. {@link FileIO.Write} can

 *       additionally compress the generated files using {@link FileIO.Write#withCompression}.

 *   <li><b>How all of the above can be element-dependent:</b> This is controlled by <i>dynamic

 *       destinations</i>. It is possible to have different groups of elements use different

 *       policies for naming files and for configuring the {@link Sink}. See "dynamic destinations"

 *       below.

 * </ul>

 *

 * <h3>File naming</h3>

 *

 * <p>The names of generated files are produced by a {@link Write.FileNaming}. The default naming

 * strategy is to name files in the format: {@code

 * $prefix-$start-$end-$pane-$shard-of-$numShards$suffix$compressionSuffix}, where:

 *

 * <ul>

 *   <li>$prefix is set by {@link Write#withPrefix}, the default is "output".

 *   <li>$start and $end are boundaries of the window of data being written, formatted in ISO 8601

 *       format (YYYY-mm-ddTHH:MM:SSZZZ). The window is omitted in case this is the global window.

 *   <li>$pane is the index of the pane within the window. The pane is omitted in case it is known

 *       to be the only pane for this window.

 *   <li>$shard is the index of the current shard being written, out of the $numShards total shards

 *       written for the current pane. Both are formatted using 5 digits (or more if necessary

 *       according to $numShards) and zero-padded.

 *   <li>$suffix is set by {@link Write#withSuffix}, the default is empty.

 *   <li>$compressionSuffix is based on the default extension for the chosen {@link

 *       Write#withCompression compression type}.

 * </ul>

 *

 * <p>For example: {@code data-2017-12-01T19:00:00Z-2017-12-01T20:00:00Z-2-00010-of-00050.txt.gz}

 *

 * <p>Alternatively, one can specify a custom naming strategy using {@link

 * Write#withNaming(Write.FileNaming)}.

 *

 * <p>If {@link Write#to} is specified, then the filenames produced by the {@link Write.FileNaming}

 * are resolved relative to that directory.

 *

 * <p>When using dynamic destinations via {@link #writeDynamic} (see below), specifying a custom

 * naming strategy is required, using {@link Write#withNaming(SerializableFunction)} or {@link

 * Write#withNaming(Contextful)}. In those, pass a function that creates a {@link Write.FileNaming}

 * for the requested group ("destination"). You can either implement a custom {@link

 * Write.FileNaming}, or use {@link Write#defaultNaming} to configure the default naming strategy

 * with a prefix and suffix as per above.

 *

 * <h3>Dynamic destinations</h3>

 *

 * <p>If the elements in the input collection can be partitioned into groups that should be treated

 * differently, {@link FileIO.Write} supports different treatment per group ("destination"). It can

 * use different file naming strategies for different groups, and can differently configure the

 * {@link Sink}, e.g. write different elements to Avro files in different directories with different

 * schemas.

 *

 * <p>This feature is supported by {@link #writeDynamic}. Use {@link Write#by} to specify how to

 * partition the elements into groups ("destinations"). Then elements will be grouped by

 * destination, and {@link Write#withNaming(Contextful)} and {@link Write#via(Contextful)} will be

 * applied separately within each group, i.e. different groups will be written using the file naming

 * strategies returned by {@link Write#withNaming(Contextful)} and using sinks returned by {@link

 * Write#via(Contextful)} for the respective destinations. Note that currently sharding can not be

 * destination-dependent: every window/pane for every destination will use the same number of shards

 * specified via {@link Write#withNumShards} or {@link Write#withSharding}.

 *

 * <h3>Writing custom types to sinks</h3>

 *

 * <p>Normally, when writing a collection of a custom type using a {@link Sink} that takes a

 * different type (for example, writing a {@code PCollection<Event>} to a text-based {@code

 * Sink<String>}), one can simply apply a {@code ParDo} or {@code MapElements} to convert the custom

 * type to the sink's <i>output type</i>.

 *

 * <p>However, when using dynamic destinations, in many such cases the destination needs to be

 * extract from the original type, so such a conversion is not possible. For example, one might

 * write events of a custom class {@code Event} to a text sink, using the event's "type" as a

 * destination. In that case, specify an <i>output function</i> in {@link Write#via(Contextful,

 * Contextful)} or {@link Write#via(Contextful, Sink)}.

 *

 * <h3>Example: Writing CSV files</h3>

 *

 * <pre>{@code

 * class CSVSink implements FileSink<List<String>> {

 *   private String header;

 *   private PrintWriter writer;

 *

 *   public CSVSink(List<String> colNames) {

 *     this.header = Joiner.on(",").join(colNames);

 *   }

 *

 *   public void open(WritableByteChannel channel) throws IOException {

 *     writer = new PrintWriter(Channels.newOutputStream(channel));

 *     writer.println(header);

 *   }

 *

 *   public void write(List<String> element) throws IOException {

 *     writer.println(Joiner.on(",").join(element));

 *   }

 *

 *   public void finish() throws IOException {

 *     writer.flush();

 *   }

 * }

 *

 * PCollection<BankTransaction> transactions = ...;

 * // Convert transactions to strings before writing them to the CSV sink.

 * transactions.apply(MapElements

 *         .into(lists(strings()))

 *         .via(tx -> Arrays.asList(tx.getUser(), tx.getAmount())))

 *     .apply(FileIO.<List<String>>write()

 *         .via(new CSVSink(Arrays.asList("user", "amount"))

 *         .to(".../path/to/")

 *         .withPrefix("transactions")

 *         .withSuffix(".csv")

 * }</pre>

 *

 * <h3>Example: Writing CSV files to different directories and with different headers</h3>

 *

 * <pre>{@code

 * enum TransactionType {

 *   DEPOSIT,

 *   WITHDRAWAL,

 *   TRANSFER,

 *   ...

 *

 *   List<String> getFieldNames();

 *   List<String> getAllFields(BankTransaction tx);

 * }

 *

 * PCollection<BankTransaction> transactions = ...;

 * transactions.apply(FileIO.<TransactionType, Transaction>writeDynamic()

 *     .by(Transaction::getTypeName)

 *     .via(tx -> tx.getTypeName().toFields(tx),  // Convert the data to be written to CSVSink

 *          type -> new CSVSink(type.getFieldNames()))

 *     .to(".../path/to/")

 *     .withNaming(type -> defaultNaming(type + "-transactions", ".csv"));

 * }</pre>

 */

public class FileIO {

  private static final Logger LOG = LoggerFactory.getLogger(FileIO.class);



  /**

   * Matches a filepattern using {@link FileSystems#match} and produces a collection of matched

   * resources (both files and directories) as {@link MatchResult.Metadata}.

   *

   * <p>By default, matches the filepattern once and produces a bounded {@link PCollection}. To

   * continuously watch the filepattern for new matches, use {@link MatchAll#continuously(Duration,

   * TerminationCondition)} - this will produce an unbounded {@link PCollection}.

   *

   * <p>By default, a filepattern matching no resources is treated according to {@link

   * EmptyMatchTreatment#DISALLOW}. To configure this behavior, use {@link

   * Match#withEmptyMatchTreatment}.

   *

   * <p>Returned {@link MatchResult.Metadata} are deduplicated by filename. For example, if this

   * transform observes a file with the same name several times with different metadata (e.g.

   * because the file is growing), it will emit the metadata the first time this file is observed,

   * and will ignore future changes to this file.

   */

  public static Match match() {

    return new AutoValue_FileIO_Match.Builder()

        .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))

        .build();

  }



  /**

   * Like {@link #match}, but matches each filepattern in a collection of filepatterns.

   *

   * <p>Resources are not deduplicated between filepatterns, i.e. if the same resource matches

   * multiple filepatterns, it will be produced multiple times.

   *

   * <p>By default, a filepattern matching no resources is treated according to {@link

   * EmptyMatchTreatment#ALLOW_IF_WILDCARD}. To configure this behavior, use {@link

   * MatchAll#withEmptyMatchTreatment}.

   */

  public static MatchAll matchAll() {

    return new AutoValue_FileIO_MatchAll.Builder()

        .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))

        .build();

  }



  /**

   * Converts each result of {@link #match} or {@link #matchAll} to a {@link ReadableFile} which can

   * be used to read the contents of each file, optionally decompressing it.

   */

  public static ReadMatches readMatches() {

    return new AutoValue_FileIO_ReadMatches.Builder()

        .setCompression(Compression.AUTO)

        .setDirectoryTreatment(ReadMatches.DirectoryTreatment.SKIP)

        .build();

  }



  /** Writes elements to files using a {@link Sink}. See class-level documentation. */

  public static <InputT> Write<Void, InputT> write() {

    return new AutoValue_FileIO_Write.Builder<Void, InputT>()

        .setDynamic(false)

        .setCompression(Compression.UNCOMPRESSED)

        .setIgnoreWindowing(false)

        .setNoSpilling(false)

        .build();

  }



  /**

   * Writes elements to files using a {@link Sink} and grouping the elements using "dynamic

   * destinations". See class-level documentation.

   */

  public static <DestT, InputT> Write<DestT, InputT> writeDynamic() {

    return new AutoValue_FileIO_Write.Builder<DestT, InputT>()

        .setDynamic(true)

        .setCompression(Compression.UNCOMPRESSED)

        .setIgnoreWindowing(false)

        .setNoSpilling(false)

        .build();

  }



  /** A utility class for accessing a potentially compressed file. */

  public static final class ReadableFile {

    private final MatchResult.Metadata metadata;

    private final Compression compression;



    ReadableFile(MatchResult.Metadata metadata, Compression compression) {

      this.metadata = metadata;

      this.compression = compression;

    }



    /** Returns the {@link MatchResult.Metadata} of the file. */

    public MatchResult.Metadata getMetadata() {

      return metadata;

    }



    /** Returns the method with which this file will be decompressed in {@link #open}. */

    public Compression getCompression() {

      return compression;

    }



    /**

     * Returns a {@link ReadableByteChannel} reading the data from this file, potentially

     * decompressing it using {@link #getCompression}.

     */

    public ReadableByteChannel open() throws IOException {

      return compression.readDecompressed(FileSystems.open(metadata.resourceId()));

    }



    /**

     * Returns a {@link SeekableByteChannel} equivalent to {@link #open}, but fails if this file is

     * not {@link MatchResult.Metadata#isReadSeekEfficient seekable}.

     */

    public SeekableByteChannel openSeekable() throws IOException {

      checkState(

          getMetadata().isReadSeekEfficient(),

          "The file %s is not seekable",

          metadata.resourceId());

      return (SeekableByteChannel) open();

    }



    /** Returns the full contents of the file as bytes. */

    public byte[] readFullyAsBytes() throws IOException {

      try (InputStream stream = Channels.newInputStream(open())) {

        return StreamUtils.getBytesWithoutClosing(stream);

      }

    }



    /** Returns the full contents of the file as a {@link String} decoded as UTF-8. */

    public String readFullyAsUTF8String() throws IOException {

      return new String(readFullyAsBytes(), StandardCharsets.UTF_8);

    }



    @Override

    public String toString() {

      return "ReadableFile{metadata=" + metadata + ", compression=" + compression + '}';

    }



    @Override

    public boolean equals(Object o) {

      if (this == o) {

        return true;

      }

      if (o == null || getClass() != o.getClass()) {

        return false;

      }

      ReadableFile that = (ReadableFile) o;

      return Objects.equal(metadata, that.metadata) && compression == that.compression;

    }



    @Override

    public int hashCode() {

      return Objects.hashCode(metadata, compression);

    }

  }



  /**

   * Describes configuration for matching filepatterns, such as {@link EmptyMatchTreatment} and

   * continuous watching for matching files.

   */

  @AutoValue

  public abstract static class MatchConfiguration implements HasDisplayData, Serializable {

    /** Creates a {@link MatchConfiguration} with the given {@link EmptyMatchTreatment}. */

    public static MatchConfiguration create(EmptyMatchTreatment emptyMatchTreatment) {

      return new AutoValue_FileIO_MatchConfiguration.Builder()

          .setEmptyMatchTreatment(emptyMatchTreatment)

          .build();

    }



    abstract EmptyMatchTreatment getEmptyMatchTreatment();



    @Nullable

    abstract Duration getWatchInterval();



    @Nullable

    abstract TerminationCondition<String, ?> getWatchTerminationCondition();



    abstract Builder toBuilder();



    @AutoValue.Builder

    abstract static class Builder {

      abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);



      abstract Builder setWatchInterval(Duration watchInterval);



      abstract Builder setWatchTerminationCondition(TerminationCondition<String, ?> condition);



      abstract MatchConfiguration build();

    }



    /** Sets the {@link EmptyMatchTreatment}. */

    public MatchConfiguration withEmptyMatchTreatment(EmptyMatchTreatment treatment) {

      return toBuilder().setEmptyMatchTreatment(treatment).build();

    }



    /**

     * Continuously watches for new files at the given interval until the given termination

     * condition is reached, where the input to the condition is the filepattern.

     */

    public MatchConfiguration continuously(

        Duration interval, TerminationCondition<String, ?> condition) {

      return toBuilder().setWatchInterval(interval).setWatchTerminationCondition(condition).build();

    }



    @Override

    public void populateDisplayData(DisplayData.Builder builder) {

      builder

          .add(

              DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())

                  .withLabel("Treatment of filepatterns that match no files"))

          .addIfNotNull(

              DisplayData.item("watchForNewFilesInterval", getWatchInterval())

                  .withLabel("Interval to watch for new files"));

    }

  }



  /** Implementation of {@link #match}. */

  @AutoValue

  public abstract static class Match extends PTransform<PBegin, PCollection<MatchResult.Metadata>> {

    @Nullable

    abstract ValueProvider<String> getFilepattern();



    abstract MatchConfiguration getConfiguration();



    abstract Builder toBuilder();



    @AutoValue.Builder

    abstract static class Builder {

      abstract Builder setFilepattern(ValueProvider<String> filepattern);



      abstract Builder setConfiguration(MatchConfiguration configuration);



      abstract Match build();

    }



    /** Matches the given filepattern. */

    public Match filepattern(String filepattern) {

      return this.filepattern(StaticValueProvider.of(filepattern));

    }



    /** Like {@link #filepattern(String)} but using a {@link ValueProvider}. */

    public Match filepattern(ValueProvider<String> filepattern) {

      return toBuilder().setFilepattern(filepattern).build();

    }



    /** Sets the {@link MatchConfiguration}. */

    public Match withConfiguration(MatchConfiguration configuration) {

      return toBuilder().setConfiguration(configuration).build();

    }



    /** See {@link MatchConfiguration#withEmptyMatchTreatment(EmptyMatchTreatment)}. */

    public Match withEmptyMatchTreatment(EmptyMatchTreatment treatment) {

      return withConfiguration(getConfiguration().withEmptyMatchTreatment(treatment));

    }



    /**

     * See {@link MatchConfiguration#continuously}. The returned {@link PCollection} is unbounded.

     *

     * <p>This works only in runners supporting {@link Experimental.Kind#SPLITTABLE_DO_FN}.

     */

    @Experimental(Experimental.Kind.SPLITTABLE_DO_FN)

    public Match continuously(

        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {

      return withConfiguration(getConfiguration().continuously(pollInterval, terminationCondition));

    }



    @Override

    public PCollection<MatchResult.Metadata> expand(PBegin input) {

      return input

          .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))

          .apply("Via MatchAll", matchAll().withConfiguration(getConfiguration()));

    }



    @Override

    public void populateDisplayData(DisplayData.Builder builder) {

      super.populateDisplayData(builder);

      builder

          .addIfNotNull(

              DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))

          .include("configuration", getConfiguration());

    }

  }



  /** Implementation of {@link #matchAll}. */

  @AutoValue

  public abstract static class MatchAll

      extends PTransform<PCollection<String>, PCollection<MatchResult.Metadata>> {

    abstract MatchConfiguration getConfiguration();



    abstract Builder toBuilder();



    @AutoValue.Builder

    abstract static class Builder {

      abstract Builder setConfiguration(MatchConfiguration configuration);



      abstract MatchAll build();

    }



    /** Like {@link Match#withConfiguration}. */

    public MatchAll withConfiguration(MatchConfiguration configuration) {

      return toBuilder().setConfiguration(configuration).build();

    }



    /** Like {@link Match#withEmptyMatchTreatment}. */

    public MatchAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) {

      return withConfiguration(getConfiguration().withEmptyMatchTreatment(treatment));

    }



    /** Like {@link Match#continuously}. */

    @Experimental(Experimental.Kind.SPLITTABLE_DO_FN)

    public MatchAll continuously(

        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {

      return withConfiguration(getConfiguration().continuously(pollInterval, terminationCondition));

    }



    @Override

    public PCollection<MatchResult.Metadata> expand(PCollection<String> input) {

      PCollection<MatchResult.Metadata> res;

      if (getConfiguration().getWatchInterval() == null) {

        res =

            input.apply(

                "Match filepatterns",

                ParDo.of(new MatchFn(getConfiguration().getEmptyMatchTreatment())));

      } else {

        res =

            input

                .apply(

                    "Continuously match filepatterns",

                    Watch.growthOf(

                            Contextful.of(new MatchPollFn(), Requirements.empty()),

                            new ExtractFilenameFn())

                        .withPollInterval(getConfiguration().getWatchInterval())

                        .withTerminationPerInput(getConfiguration().getWatchTerminationCondition()))

                .apply(Values.create());

      }

      return res.apply(Reshuffle.viaRandomKey());

    }



    @Override

    public void populateDisplayData(DisplayData.Builder builder) {

      super.populateDisplayData(builder);

      builder.include("configuration", getConfiguration());

    }



    private static class MatchFn extends DoFn<String, MatchResult.Metadata> {

      private final EmptyMatchTreatment emptyMatchTreatment;



      public MatchFn(EmptyMatchTreatment emptyMatchTreatment) {

        this.emptyMatchTreatment = emptyMatchTreatment;

      }



      @ProcessElement

      public void process(ProcessContext c) throws Exception {

		  String filepattern =c.element();

        MatchResult match = FileSystems.match(filepattern, emptyMatchTreatment);

        LOG.info("Matched {} files for pattern {}", match.metadata().size(), filepattern);

        for (MatchResult.Metadata metadata : match.metadata()) {

          c.output(metadata);

        }

      }

    }



    private static class MatchPollFn extends PollFn<String, MatchResult.Metadata> {

      @Override

      public Watch.Growth.PollResult<MatchResult.Metadata> apply(String element, Context c)

          throws Exception {

        Instant now = Instant.now();

        return Watch.Growth.PollResult.incomplete(

                now, FileSystems.match(element, EmptyMatchTreatment.ALLOW).metadata())

            .withWatermark(now);

      }

    }



    private static class ExtractFilenameFn

        implements SerializableFunction<MatchResult.Metadata, String> {

      @Override

      public String apply(MatchResult.Metadata input) {

        return input.resourceId().toString();

      }

    }

  }



  /** Implementation of {@link #readMatches}. */

  @AutoValue

  public abstract static class ReadMatches

      extends PTransform<PCollection<MatchResult.Metadata>, PCollection<ReadableFile>> {

    /** Enum to control how directories are handled. */

    public enum DirectoryTreatment {

      SKIP,

      PROHIBIT

    }



    abstract Compression getCompression();



    abstract DirectoryTreatment getDirectoryTreatment();



    abstract Builder toBuilder();



    @AutoValue.Builder

    abstract static class Builder {

      abstract Builder setCompression(Compression compression);



      abstract Builder setDirectoryTreatment(DirectoryTreatment directoryTreatment);



      abstract ReadMatches build();

    }



    /** Reads files using the given {@link Compression}. Default is {@link Compression#AUTO}. */

    public ReadMatches withCompression(Compression compression) {

      checkArgument(compression != null, "compression can not be null");

      return toBuilder().setCompression(compression).build();

    }



    /**

     * Controls how to handle directories in the input {@link PCollection}. Default is {@link

     * DirectoryTreatment#SKIP}.

     */

    public ReadMatches withDirectoryTreatment(DirectoryTreatment directoryTreatment) {

      checkArgument(directoryTreatment != null, "directoryTreatment can not be null");

      return toBuilder().setDirectoryTreatment(directoryTreatment).build();

    }



    @Override

    public PCollection<ReadableFile> expand(PCollection<MatchResult.Metadata> input) {

      return input.apply(ParDo.of(new ToReadableFileFn(this)));

    }



    @Override

    public void populateDisplayData(DisplayData.Builder builder) {

      builder.add(DisplayData.item("compression", getCompression().toString()));

      builder.add(DisplayData.item("directoryTreatment", getDirectoryTreatment().toString()));

    }



    private static class ToReadableFileFn extends DoFn<MatchResult.Metadata, ReadableFile> {

      private final ReadMatches spec;



      private ToReadableFileFn(ReadMatches spec) {

        this.spec = spec;

      }



      @ProcessElement

      public void process(ProcessContext c) {

        MatchResult.Metadata metadata =c.element();

        if (metadata.resourceId().isDirectory()) {

          switch (spec.getDirectoryTreatment()) {

            case SKIP:

              return;



            case PROHIBIT:

              throw new IllegalArgumentException(

                  "Trying to read " + metadata.resourceId() + " which is a directory");



            default:

              throw new UnsupportedOperationException(

                  "Unknown DirectoryTreatment: " + spec.getDirectoryTreatment());

          }

        }



        Compression compression =

            (spec.getCompression() == Compression.AUTO)

                ? Compression.detect(metadata.resourceId().getFilename())

                : spec.getCompression();

        c.output(

            new ReadableFile(

                MatchResult.Metadata.builder()

                    .setResourceId(metadata.resourceId())

                    .setSizeBytes(metadata.sizeBytes())

                    .setLastModifiedMillis(metadata.lastModifiedMillis())

                    .setIsReadSeekEfficient(

                        metadata.isReadSeekEfficient() && compression == Compression.UNCOMPRESSED)

                    .build(),

                compression));

      }

    }

  }



  /**

   * Specifies how to write elements to individual files in {@link FileIO#write} and {@link

   * FileIO#writeDynamic}. A new instance of {@link Sink} is created for every file being written.

   */

  public interface Sink<ElementT> extends Serializable {

    /**

     * Initializes writing to the given channel. Will be invoked once on a given {@link Sink}

     * instance.

     */

    void open(WritableByteChannel channel) throws IOException;



    /** Appends a single element to the file. May be invoked zero or more times. */

    void write(ElementT element) throws IOException;



    /**

     * Flushes the buffered state (if any) before the channel is closed. Does not need to close the

     * channel. Will be invoked once.

     */

    void flush() throws IOException;

  }



  /** Implementation of {@link #write} and {@link #writeDynamic}. */

  @AutoValue

  @Experimental(Experimental.Kind.SOURCE_SINK)

  public abstract static class Write<DestinationT, UserT>

      extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>> {

    /** A policy for generating names for shard files. */

    public interface FileNaming extends Serializable {

      /**

       * Generates the filename. MUST use each argument and return different values for each

       * combination of the arguments.

       */

      String getFilename(

		  BoundedWindow window,

		  PaneInfo pane,

		  int numShards,

		  int shardIndex,

		  Compression compression);

    }



    public static FileNaming defaultNaming(final String prefix, final String suffix) {

      return defaultNaming(StaticValueProvider.of(prefix), StaticValueProvider.of(suffix));

    }



    /**

     * Defines a default {@link FileNaming} which will use the prefix and suffix supplied to create

     * a name based on the window, pane, number of shards, shard index, and compression. Removes

     * window when in the {@link GlobalWindow} and pane info when it is the only firing of the pane.

     */

    public static FileNaming defaultNaming(

        final ValueProvider<String> prefix, final ValueProvider<String> suffix) {

      return (window, pane, numShards, shardIndex, compression) -> {

        checkArgument(window != null, "window can not be null");

        checkArgument(pane != null, "pane can not be null");

        checkArgument(compression != null, "compression can not be null");

        StringBuilder res = new StringBuilder(prefix.get());

        if (window != GlobalWindow.INSTANCE) {

          if (res.length() > 0) {

            res.append("-");

          }

          checkArgument(

              window instanceof IntervalWindow,

              "defaultNaming() supports only windows of type %s, " + "but got window %s of type %s",

              IntervalWindow.class.getSimpleName(),

              window,

              window.getClass().getSimpleName());

          IntervalWindow iw = (IntervalWindow) window;

          res.append(iw.start().toString()).append("-").append(iw.end().toString());

        }

        boolean isOnlyFiring = pane.isFirst() && pane.isLast();

        if (!isOnlyFiring) {

          if (res.length() > 0) {

            res.append("-");

          }

          res.append(pane.getIndex());

        }

        if (res.length() > 0) {

          res.append("-");

        }

        String numShardsStr = String.valueOf(numShards);

        // A trillion shards per window per pane ought to be enough for everybody.

        DecimalFormat df =

            new DecimalFormat("000000000000".substring(0, Math.max(5, numShardsStr.length())));

        res.append(df.format(shardIndex)).append("-of-").append(df.format(numShards));

        res.append(suffix.get());

        res.append(compression.getSuggestedSuffix());

        return res.toString();

      };

    }



    public static FileNaming relativeFileNaming(

        final ValueProvider<String> baseDirectory, final FileNaming innerNaming) {

      return (window, pane, numShards, shardIndex, compression) ->

          FileSystems.matchNewResource(baseDirectory.get(), true /* isDirectory */)

              .resolve(

                  innerNaming.getFilename(window, pane, numShards, shardIndex, compression),

                  RESOLVE_FILE)

              .toString();

    }



    abstract boolean getDynamic();



    @Nullable

    abstract Contextful<Fn<DestinationT, Sink<?>>> getSinkFn();



    @Nullable

    abstract Contextful<Fn<UserT, ?>> getOutputFn();



    @Nullable

    abstract Contextful<Fn<UserT, DestinationT>> getDestinationFn();



    @Nullable

    abstract ValueProvider<String> getOutputDirectory();



    @Nullable

    abstract ValueProvider<String> getFilenamePrefix();



    @Nullable

    abstract ValueProvider<String> getFilenameSuffix();



    @Nullable

    abstract FileNaming getConstantFileNaming();



    @Nullable

    abstract Contextful<Fn<DestinationT, FileNaming>> getFileNamingFn();



    @Nullable

    abstract DestinationT getEmptyWindowDestination();



    @Nullable

    abstract Coder<DestinationT> getDestinationCoder();



    @Nullable

    abstract ValueProvider<String> getTempDirectory();



    abstract Compression getCompression();



    @Nullable

    abstract ValueProvider<Integer> getNumShards();



    @Nullable

    abstract PTransform<PCollection<UserT>, PCollectionView<Integer>> getSharding();



    abstract boolean getIgnoreWindowing();



    abstract boolean getNoSpilling();



    abstract Builder<DestinationT, UserT> toBuilder();



    @AutoValue.Builder

    abstract static class Builder<DestinationT, UserT> {

      abstract Builder<DestinationT, UserT> setDynamic(boolean dynamic);



      abstract Builder<DestinationT, UserT> setSinkFn(Contextful<Fn<DestinationT, Sink<?>>> sink);



      abstract Builder<DestinationT, UserT> setOutputFn(Contextful<Fn<UserT, ?>> outputFn);



      abstract Builder<DestinationT, UserT> setDestinationFn(

          Contextful<Fn<UserT, DestinationT>> destinationFn);



      abstract Builder<DestinationT, UserT> setOutputDirectory(

          ValueProvider<String> outputDirectory);



      abstract Builder<DestinationT, UserT> setFilenamePrefix(ValueProvider<String> filenamePrefix);



      abstract Builder<DestinationT, UserT> setFilenameSuffix(ValueProvider<String> filenameSuffix);



      abstract Builder<DestinationT, UserT> setConstantFileNaming(FileNaming constantFileNaming);



      abstract Builder<DestinationT, UserT> setFileNamingFn(

          Contextful<Fn<DestinationT, FileNaming>> namingFn);



      abstract Builder<DestinationT, UserT> setEmptyWindowDestination(

          DestinationT emptyWindowDestination);



      abstract Builder<DestinationT, UserT> setDestinationCoder(

          Coder<DestinationT> destinationCoder);



      abstract Builder<DestinationT, UserT> setTempDirectory(

          ValueProvider<String> tempDirectoryProvider);



      abstract Builder<DestinationT, UserT> setCompression(Compression compression);



      abstract Builder<DestinationT, UserT> setNumShards(

          @Nullable ValueProvider<Integer> numShards);



      abstract Builder<DestinationT, UserT> setSharding(

          PTransform<PCollection<UserT>, PCollectionView<Integer>> sharding);



      abstract Builder<DestinationT, UserT> setIgnoreWindowing(boolean ignoreWindowing);



      abstract Builder<DestinationT, UserT> setNoSpilling(boolean noSpilling);



      abstract Write<DestinationT, UserT> build();

    }



    /** Specifies how to partition elements into groups ("destinations"). */

    public Write<DestinationT, UserT> by(SerializableFunction<UserT, DestinationT> destinationFn) {

      checkArgument(destinationFn != null, "destinationFn can not be null");

      return by(fn(destinationFn));

    }



    /** Like {@link #by}, but with access to context such as side inputs. */

    public Write<DestinationT, UserT> by(Contextful<Fn<UserT, DestinationT>> destinationFn) {

      checkArgument(destinationFn != null, "destinationFn can not be null");

      return toBuilder().setDestinationFn(destinationFn).build();

    }



    /**

     * Specifies how to create a {@link Sink} for a particular destination and how to map the

     * element type to the sink's output type. The sink function must create a new {@link Sink}

     * instance every time it is called.

     */

    public <OutputT> Write<DestinationT, UserT> via(

        Contextful<Fn<UserT, OutputT>> outputFn,

        Contextful<Fn<DestinationT, Sink<OutputT>>> sinkFn) {

      checkArgument(sinkFn != null, "sinkFn can not be null");

      checkArgument(outputFn != null, "outputFn can not be null");

      return toBuilder().setSinkFn((Contextful) sinkFn).setOutputFn(outputFn).build();

    }



    /** Like {@link #via(Contextful, Contextful)}, but uses the same sink for all destinations. */

    public <OutputT> Write<DestinationT, UserT> via(

		Contextful<Fn<UserT, OutputT>> outputFn, final Sink<OutputT> sink) {

      checkArgument(sink != null, "sink can not be null");

      checkArgument(outputFn != null, "outputFn can not be null");

      return via(outputFn, fn(SerializableFunctions.clonesOf(sink)));

    }



    /**

     * Like {@link #via(Contextful, Contextful)}, but the output type of the sink is the same as the

     * type of the input collection. The sink function must create a new {@link Sink} instance every

     * time it is called.

     */

    public Write<DestinationT, UserT> via(Contextful<Fn<DestinationT, Sink<UserT>>> sinkFn) {

      checkArgument(sinkFn != null, "sinkFn can not be null");

      return toBuilder()

          .setSinkFn((Contextful) sinkFn)

          .setOutputFn(fn(SerializableFunctions.<UserT>identity()))

          .build();

    }



    /** Like {@link #via(Contextful)}, but uses the same {@link Sink} for all destinations. */

    public Write<DestinationT, UserT> via(Sink<UserT> sink) {

      checkArgument(sink != null, "sink can not be null");

      return via(fn(SerializableFunctions.clonesOf(sink)));

    }



    /**

     * Specifies a common directory for all generated files. A temporary generated sub-directory of

     * this directory will be used as the temp directory, unless overridden by {@link

     * #withTempDirectory}.

     */

    public Write<DestinationT, UserT> to(String directory) {

      checkArgument(directory != null, "directory can not be null");

      return to(StaticValueProvider.of(directory));

    }



    /** Like {@link #to(String)} but with a {@link ValueProvider}. */

    public Write<DestinationT, UserT> to(ValueProvider<String> directory) {

      checkArgument(directory != null, "directory can not be null");

      return toBuilder().setOutputDirectory(directory).build();

    }



    /**

     * Specifies a common prefix to use for all generated filenames, if using the default file

     * naming. Incompatible with {@link #withNaming}.

     */

    public Write<DestinationT, UserT> withPrefix(String prefix) {

      checkArgument(prefix != null, "prefix can not be null");

      return withPrefix(StaticValueProvider.of(prefix));

    }



    /** Like {@link #withPrefix(String)} but with a {@link ValueProvider}. */

    public Write<DestinationT, UserT> withPrefix(ValueProvider<String> prefix) {

      checkArgument(prefix != null, "prefix can not be null");

      return toBuilder().setFilenamePrefix(prefix).build();

    }



    /**

     * Specifies a common suffix to use for all generated filenames, if using the default file

     * naming. Incompatible with {@link #withNaming}.

     */

    public Write<DestinationT, UserT> withSuffix(String suffix) {

      checkArgument(suffix != null, "suffix can not be null");

      return withSuffix(StaticValueProvider.of(suffix));

    }



    /** Like {@link #withSuffix(String)} but with a {@link ValueProvider}. */

    public Write<DestinationT, UserT> withSuffix(ValueProvider<String> suffix) {

      checkArgument(suffix != null, "suffix can not be null");

      return toBuilder().setFilenameSuffix(suffix).build();

    }



    /**

     * Specifies a custom strategy for generating filenames. All generated filenames will be

     * resolved relative to the directory specified in {@link #to}, if any.

     *

     * <p>Incompatible with {@link #withSuffix}.

     *

     * <p>This can only be used in combination with {@link #write()} but not {@link

     * #writeDynamic()}.

     */

    public Write<DestinationT, UserT> withNaming(FileNaming naming) {

      checkArgument(naming != null, "naming can not be null");

      return toBuilder().setConstantFileNaming(naming).build();

    }



    /**

     * Specifies a custom strategy for generating filenames depending on the destination, similar to

     * {@link #withNaming(FileNaming)}.

     *

     * <p>This can only be used in combination with {@link #writeDynamic()} but not {@link

     * #write()}.

     */

    public Write<DestinationT, UserT> withNaming(

        SerializableFunction<DestinationT, FileNaming> namingFn) {

      checkArgument(namingFn != null, "namingFn can not be null");

      return withNaming(fn(namingFn));

    }



    /**

     * Like {@link #withNaming(SerializableFunction)} but allows accessing context, such as side

     * inputs, from the function.

     */

    public Write<DestinationT, UserT> withNaming(

        Contextful<Fn<DestinationT, FileNaming>> namingFn) {

      checkArgument(namingFn != null, "namingFn can not be null");

      return toBuilder().setFileNamingFn(namingFn).build();

    }



    /** Specifies a directory into which all temporary files will be placed. */

    public Write<DestinationT, UserT> withTempDirectory(String tempDirectory) {

      checkArgument(tempDirectory != null, "tempDirectory can not be null");

      return withTempDirectory(StaticValueProvider.of(tempDirectory));

    }



    /** Like {@link #withTempDirectory(String)}. */

    public Write<DestinationT, UserT> withTempDirectory(ValueProvider<String> tempDirectory) {

      checkArgument(tempDirectory != null, "tempDirectory can not be null");

      return toBuilder().setTempDirectory(tempDirectory).build();

    }



    /**

     * Specifies to compress all generated shard files using the given {@link Compression} and, by

     * default, append the respective extension to the filename.

     */

    public Write<DestinationT, UserT> withCompression(Compression compression) {

      checkArgument(compression != null, "compression can not be null");

      checkArgument(

          compression != Compression.AUTO, "AUTO compression is not supported for writing");

      return toBuilder().setCompression(compression).build();

    }



    /**

     * If {@link #withIgnoreWindowing()} is specified, specifies a destination to be used in case

     * the collection is empty, to generate the (only, empty) output file.

     */

    public Write<DestinationT, UserT> withEmptyGlobalWindowDestination(

        DestinationT emptyWindowDestination) {

      return toBuilder().setEmptyWindowDestination(emptyWindowDestination).build();

    }



    /**

     * Specifies a {@link Coder} for the destination type, if it can not be inferred from {@link

     * #by}.

     */

    public Write<DestinationT, UserT> withDestinationCoder(Coder<DestinationT> destinationCoder) {

      checkArgument(destinationCoder != null, "destinationCoder can not be null");

      return toBuilder().setDestinationCoder(destinationCoder).build();

    }



    /**

     * Specifies to use a given fixed number of shards per window. 0 means runner-determined

     * sharding. Specifying a non-zero value may hurt performance, because it will limit the

     * parallelism of writing and will introduce an extra {@link GroupByKey} operation.

     */

    public Write<DestinationT, UserT> withNumShards(int numShards) {

      checkArgument(numShards >= 0, "numShards must be non-negative, but was: %s", numShards);

      if (numShards == 0) {

        return withNumShards(null);

      }

      return withNumShards(StaticValueProvider.of(numShards));

    }



    /**

     * Like {@link #withNumShards(int)}. Specifying {@code null} means runner-determined sharding.

     */

    public Write<DestinationT, UserT> withNumShards(@Nullable ValueProvider<Integer> numShards) {

      return toBuilder().setNumShards(numShards).build();

    }



    /**

     * Specifies a {@link PTransform} to use for computing the desired number of shards in each

     * window.

     */

    public Write<DestinationT, UserT> withSharding(

        PTransform<PCollection<UserT>, PCollectionView<Integer>> sharding) {

      checkArgument(sharding != null, "sharding can not be null");

      return toBuilder().setSharding(sharding).build();

    }



    /**

     * Specifies to ignore windowing information in the input, and instead rewindow it to global

     * window with the default trigger.

     *

     * @deprecated Avoid usage of this method: its effects are complex and it will be removed in

     *     future versions of Unified. Right now it exists for compatibility with {@link WriteFiles}.

     */

    @Deprecated

    public Write<DestinationT, UserT> withIgnoreWindowing() {

      return toBuilder().setIgnoreWindowing(true).build();

    }



    /** See {@link WriteFiles#withNoSpilling()}. */

    public Write<DestinationT, UserT> withNoSpilling() {

      return toBuilder().setNoSpilling(true).build();

    }



    @VisibleForTesting

	Contextful<Fn<DestinationT, FileNaming>> resolveFileNamingFn() {

      if (getDynamic()) {

        checkArgument(

            getConstantFileNaming() == null,

            "when using writeDynamic(), must use versions of .withNaming() "

                + "that take functions from DestinationT");

        checkArgument(getFilenamePrefix() == null, ".withPrefix() requires write()");

        checkArgument(getFilenameSuffix() == null, ".withSuffix() requires write()");

        checkArgument(

            getFileNamingFn() != null,

            "when using writeDynamic(), must specify "

                + ".withNaming() taking a function form DestinationT");

        return fn(

            (element, c) -> {

              FileNaming naming = getFileNamingFn().getClosure().apply(element, c);

              return getOutputDirectory() == null

                  ? naming

                  : relativeFileNaming(getOutputDirectory(), naming);

            },

            getFileNamingFn().getRequirements());

      } else {

        checkArgument(

            getFileNamingFn() == null,

            ".withNaming() taking a function from DestinationT requires writeDynamic()");

        FileNaming constantFileNaming;

        if (getConstantFileNaming() == null) {

          constantFileNaming =

              defaultNaming(

                  MoreObjects.firstNonNull(getFilenamePrefix(), StaticValueProvider.of("output")),

                  MoreObjects.firstNonNull(getFilenameSuffix(), StaticValueProvider.of("")));

        } else {

          checkArgument(

              getFilenamePrefix() == null, ".to(FileNaming) is incompatible with .withSuffix()");

          checkArgument(

              getFilenameSuffix() == null, ".to(FileNaming) is incompatible with .withPrefix()");

          constantFileNaming = getConstantFileNaming();

        }

        if (getOutputDirectory() != null) {

          constantFileNaming = relativeFileNaming(getOutputDirectory(), constantFileNaming);

        }

        return fn(SerializableFunctions.<DestinationT, FileNaming>constant(constantFileNaming));

      }

    }



    @Override

    public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {

      Write.Builder<DestinationT, UserT> resolvedSpec = new AutoValue_FileIO_Write.Builder<>();



      resolvedSpec.setDynamic(getDynamic());



      checkArgument(getSinkFn() != null, ".via() is required");

      resolvedSpec.setSinkFn(getSinkFn());



      checkArgument(getOutputFn() != null, "outputFn should have been set by .via()");

      resolvedSpec.setOutputFn(getOutputFn());



      // Resolve destinationFn

      if (getDynamic()) {

        checkArgument(getDestinationFn() != null, "when using writeDynamic(), .by() is required");

        resolvedSpec.setDestinationFn(getDestinationFn());

        resolvedSpec.setDestinationCoder(resolveDestinationCoder(input));

      } else {

        checkArgument(getDestinationFn() == null, ".by() requires writeDynamic()");

        checkArgument(

            getDestinationCoder() == null, ".withDestinationCoder() requires writeDynamic()");

        resolvedSpec.setDestinationFn(fn(SerializableFunctions.constant(null)));

        resolvedSpec.setDestinationCoder((Coder) VoidCoder.of());

      }



      resolvedSpec.setFileNamingFn(resolveFileNamingFn());

      resolvedSpec.setEmptyWindowDestination(getEmptyWindowDestination());

      if (getTempDirectory() == null) {

        checkArgument(

            getOutputDirectory() != null, "must specify either .withTempDirectory() or .to()");

        resolvedSpec.setTempDirectory(getOutputDirectory());

      } else {

        resolvedSpec.setTempDirectory(getTempDirectory());

      }



      resolvedSpec.setCompression(getCompression());

      resolvedSpec.setNumShards(getNumShards());

      resolvedSpec.setSharding(getSharding());

      resolvedSpec.setIgnoreWindowing(getIgnoreWindowing());

      resolvedSpec.setNoSpilling(getNoSpilling());



      Write<DestinationT, UserT> resolved = resolvedSpec.build();

      WriteFiles<UserT, DestinationT, ?> writeFiles =

          WriteFiles.to(new ViaFileBasedSink<>(resolved))

              .withSideInputs(Lists.newArrayList(resolved.getAllSideInputs()));

      if (getNumShards() != null) {

        writeFiles = writeFiles.withNumShards(getNumShards());

      } else if (getSharding() != null) {

        writeFiles = writeFiles.withSharding(getSharding());

      } else {

        writeFiles = writeFiles.withRunnerDeterminedSharding();

      }

      if (!getIgnoreWindowing()) {

        writeFiles = writeFiles.withWindowedWrites();

      }

      if (getNoSpilling()) {

        writeFiles = writeFiles.withNoSpilling();

      }

      return input.apply(writeFiles);

    }



    private Coder<DestinationT> resolveDestinationCoder(PCollection<UserT> input) {

      Coder<DestinationT> destinationCoder = getDestinationCoder();

      if (destinationCoder == null) {

        TypeDescriptor<DestinationT> destinationT =

            TypeDescriptors.outputOf(getDestinationFn().getClosure());

        try {

          destinationCoder = input.getPipeline().getCoderRegistry().getCoder(destinationT);

        } catch (CannotProvideCoderException e) {

          throw new IllegalArgumentException(

              "Unable to infer a coder for destination type (inferred from .by() as \""

                  + destinationT

                  + "\") - specify it explicitly using .withDestinationCoder()");

        }

      }

      return destinationCoder;

    }



    private Collection<PCollectionView<?>> getAllSideInputs() {

      return Requirements.union(getDestinationFn(), getOutputFn(), getSinkFn(), getFileNamingFn())

          .getSideInputs();

    }



    private static class ViaFileBasedSink<UserT, DestinationT, OutputT>

        extends FileBasedSink<UserT, DestinationT, OutputT> {

      private final Write<DestinationT, UserT> spec;



      private ViaFileBasedSink(Write<DestinationT, UserT> spec) {

        super(

            ValueProvider.NestedValueProvider.of(

                spec.getTempDirectory(),

                input -> FileSystems.matchNewResource(input, true /* isDirectory */)),

            new DynamicDestinationsAdapter<>(spec),

            spec.getCompression());

        this.spec = spec;

      }



      @Override

      public WriteOperation<DestinationT, OutputT> createWriteOperation() {

        return new WriteOperation<DestinationT, OutputT>(this) {

          @Override

          public Writer<DestinationT, OutputT> createWriter() throws Exception {

            return new Writer<DestinationT, OutputT>(this, "") {

              @Nullable

			  private Sink<OutputT> sink;



              @Override

              protected void prepareWrite(WritableByteChannel channel) throws Exception {

                Fn<DestinationT, Sink<OutputT>> sinkFn = (Fn) spec.getSinkFn().getClosure();

                sink =

                    sinkFn.apply(

                        getDestination(),

                        new Fn.Context() {

                          @Override

                          public <T> T sideInput(PCollectionView<T> view) {

                            return getWriteOperation()

                                .getSink()

                                .getDynamicDestinations()

                                .sideInput(view);

                          }

                        });

                sink.open(channel);

              }



              @Override

              public void write(OutputT value) throws Exception {

                sink.write(value);

              }



              @Override

              protected void finishWrite() throws Exception {

                sink.flush();

              }

            };

          }

        };

      }



      private static class DynamicDestinationsAdapter<UserT, DestinationT, OutputT>

          extends DynamicDestinations<UserT, DestinationT, OutputT> {

        private final Write<DestinationT, UserT> spec;

        @Nullable

		private transient Fn.Context context;



        private DynamicDestinationsAdapter(Write<DestinationT, UserT> spec) {

          this.spec = spec;

        }



        private Fn.Context getContext() {

          if (context == null) {

            context =

                new Fn.Context() {

                  @Override

                  public <T> T sideInput(PCollectionView<T> view) {

                    return DynamicDestinationsAdapter.this.sideInput(view);

                  }

                };

          }

          return context;

        }



        @Override

        public OutputT formatRecord(UserT record) {

          try {

            return ((Fn<UserT, OutputT>) spec.getOutputFn().getClosure())

                .apply(record, getContext());

          } catch (Exception e) {

            throw new RuntimeException(e);

          }

        }



        @Override

        public DestinationT getDestination(UserT element) {

          try {

            return spec.getDestinationFn().getClosure().apply(element, getContext());

          } catch (Exception e) {

            throw new RuntimeException(e);

          }

        }



        @Override

        public DestinationT getDefaultDestination() {

          return spec.getEmptyWindowDestination();

        }



        @Override

        public FilenamePolicy getFilenamePolicy(final DestinationT destination) {

          final FileNaming namingFn;

          try {

            namingFn = spec.getFileNamingFn().getClosure().apply(destination, getContext());

          } catch (Exception e) {

            throw new RuntimeException(e);

          }

          return new FilenamePolicy() {

            @Override

            public ResourceId windowedFilename(

                int shardNumber,

                int numShards,

                BoundedWindow window,

                PaneInfo paneInfo,

                OutputFileHints outputFileHints) {

              // We ignore outputFileHints because it will always be the same as

              // spec.getCompression() because we control the FileBasedSink.

              return FileSystems.matchNewResource(

                  namingFn.getFilename(

                      window, paneInfo, numShards, shardNumber, spec.getCompression()),

                  false /* isDirectory */);

            }



            @Nullable

            @Override

            public ResourceId unwindowedFilename(

                int shardNumber, int numShards, OutputFileHints outputFileHints) {

              return FileSystems.matchNewResource(

                  namingFn.getFilename(

                      GlobalWindow.INSTANCE,

                      PaneInfo.NO_FIRING,

                      numShards,

                      shardNumber,

                      spec.getCompression()),

                  false /* isDirectory */);

            }

          };

        }



        @Override

        public List<PCollectionView<?>> getSideInputs() {

          return Lists.newArrayList(spec.getAllSideInputs());

        }



        @Nullable

        @Override

        public Coder<DestinationT> getDestinationCoder() {

          return spec.getDestinationCoder();

        }

      }

    }

  }

}